// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use databend_common_catalog::plan::InvertedIndexInfo;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_exception::Result;
use databend_common_expression::types::F32;
use databend_storages_common_io::ReadSettings;
use opendal::Operator;
use tantivy::query::Query;
use tantivy::query::QueryClone;
use tantivy::query::QueryParser;
use tantivy::schema::Field;
use tantivy::schema::IndexRecordOption;
use tantivy::tokenizer::TokenizerManager;

use crate::io::create_index_schema;
use crate::io::create_tokenizer_manager;
use crate::io::read::InvertedIndexReader;
use crate::io::TableMetaLocationGenerator;
use crate::TableContext;

// Each block file has a corresponding index file,
// the data in the index file is generated by tantivy.
// Index searcher return matched docIds and scores for query text.
// If no docId is matched, the corresponding block can be pruned.
//
// ┌────────┐     ┌────────┐   ┌────────┐     ┌────────┐
// │ Index1 │ ... │ IndexM │   │ IndexN │ ... │ IndexZ │
// └────────┘     └────────┘   └────────┘     └────────┘
//     |              |            |              |
//     |              |            |              |
// ┌────────┐     ┌────────┐   ┌────────┐     ┌────────┐
// │ Block1 │ ... │ BlockM │   │ BlockN │ ... │ BlockZ │
// └────────┘     └────────┘   └────────┘     └────────┘
//  \                     /     \                     /
//   \          _________/       \          _________/
//    \        /                  \        /
//     Segment1           ...      SegmentN
//
pub struct InvertedIndexPruner {
    dal: Operator,
    settings: ReadSettings,
    has_score: bool,
    index_name: String,
    index_version: String,
    need_position: bool,
    tokenizer_manager: TokenizerManager,
    query: Box<dyn Query>,
    field_ids: HashSet<u32>,
    index_record: IndexRecordOption,
    fuzziness: Option<u8>,
}

impl InvertedIndexPruner {
    pub fn try_create(
        ctx: &Arc<dyn TableContext>,
        dal: Operator,
        push_down: &Option<PushDownInfo>,
    ) -> Result<Option<Arc<InvertedIndexPruner>>> {
        let inverted_index_info = push_down.as_ref().and_then(|p| p.inverted_index.as_ref());
        if let Some(inverted_index_info) = inverted_index_info {
            let (query, fuzziness, tokenizer_manager) =
                create_inverted_index_query(inverted_index_info)?;

            let index_record: IndexRecordOption =
                match inverted_index_info.index_options.get("index_record") {
                    Some(v) => serde_json::from_str(v)?,
                    None => IndexRecordOption::WithFreqsAndPositions,
                };

            let settings = ReadSettings::from_ctx(ctx)?;

            let mut need_position = false;
            let mut field_ids = HashSet::new();
            query.query_terms(&mut |term, pos| {
                let field = term.field();
                let field_id = field.field_id();
                field_ids.insert(field_id);
                if pos {
                    need_position = true;
                }
            });

            // whether need to generate score internl column
            let has_score = inverted_index_info.has_score;
            let index_name = inverted_index_info.index_name.clone();
            let index_version = inverted_index_info.index_version.clone();

            return Ok(Some(Arc::new(InvertedIndexPruner {
                dal,
                settings,
                has_score,
                index_name,
                index_version,
                need_position,
                tokenizer_manager,
                query,
                field_ids,
                index_record,
                fuzziness,
            })));
        }
        Ok(None)
    }

    #[async_backtrace::framed]
    pub async fn should_keep(
        &self,
        block_loc: &str,
        row_count: u64,
    ) -> Result<Option<Vec<(usize, Option<F32>)>>> {
        let index_loc = TableMetaLocationGenerator::gen_inverted_index_location_from_block_location(
            block_loc,
            &self.index_name,
            &self.index_version,
        );

        let inverted_index_reader = InvertedIndexReader::create(
            self.dal.clone(),
            self.need_position,
            self.has_score,
            self.tokenizer_manager.clone(),
            row_count,
        );

        let matched_rows = inverted_index_reader
            .do_filter(
                &self.settings,
                self.query.box_clone(),
                &self.field_ids,
                &self.index_record,
                &self.fuzziness,
                &index_loc,
            )
            .await?;

        Ok(matched_rows)
    }
}

// create tantivy query for inverted index.
pub fn create_inverted_index_query(
    inverted_index_info: &InvertedIndexInfo,
) -> Result<(Box<dyn Query>, Option<u8>, TokenizerManager)> {
    // collect query fields and optional boosts
    let mut query_fields = Vec::with_capacity(inverted_index_info.query_fields.len());
    let mut query_field_boosts = Vec::with_capacity(inverted_index_info.query_fields.len());
    for (field_name, boost) in &inverted_index_info.query_fields {
        let i = inverted_index_info.index_schema.index_of(field_name)?;
        let field = Field::from_field_id(i as u32);
        query_fields.push(field);
        if let Some(boost) = boost {
            query_field_boosts.push((field, boost.0));
        }
    }

    // parse query text to check whether has phrase terms need position file.
    let (index_schema, _) = create_index_schema(
        Arc::new(inverted_index_info.index_schema.clone()),
        &inverted_index_info.index_options,
    )?;
    let tokenizer_manager = create_tokenizer_manager(&inverted_index_info.index_options);
    let mut query_parser = QueryParser::new(
        index_schema,
        query_fields.clone(),
        tokenizer_manager.clone(),
    );

    // set optional boost value for the field
    for (field, boost) in query_field_boosts {
        query_parser.set_field_boost(field, boost);
    }
    let fuzziness = inverted_index_info
        .inverted_index_option
        .as_ref()
        .and_then(|o| o.fuzziness);
    if let Some(fuzziness) = fuzziness {
        // Fuzzy query matches rows containing a specific term that is within Levenshtein distance.
        for field in query_fields {
            query_parser.set_field_fuzzy(field, false, fuzziness, true);
        }
    }
    let operator = inverted_index_info
        .inverted_index_option
        .as_ref()
        .map(|o| o.operator)
        .unwrap_or_default();
    if operator {
        // Operator if TRUE means operator is `AND`,
        // set compose queries to a conjunction.
        query_parser.set_conjunction_by_default();
    }
    let lenient = inverted_index_info
        .inverted_index_option
        .as_ref()
        .map(|o| o.lenient)
        .unwrap_or_default();
    let query = if lenient {
        // If lenient is TRUE, invalid query text will not report an error.
        let (query, _) = query_parser.parse_query_lenient(&inverted_index_info.query_text);
        query
    } else {
        query_parser.parse_query(&inverted_index_info.query_text)?
    };

    Ok((query, fuzziness, tokenizer_manager))
}
